---
title: Dagster with PySpark | Dagster
---

# Using Dagster with PySpark

This guide includes two examples:

- [Running PySpark code in solids](#running-pyspark-code-in-solids) demonstrates the basic usage of the [`dagster-pyspark`](/\_apidocs/libraries/dagster_pyspark) package.
- [Submitting PySpark solids on EMR](#submitting-pyspark-solids-on-emr) shows have a solid run as a Spark step on an EMR cluster.

## Running PySpark code in solids

<CodeReferenceLink filePath="examples/basic_pyspark" />

Passing PySpark DataFrames between solids requires a little bit of extra care, for a couple reasons:

- Spark has a lazy execution model, which means that PySpark won't process any data until an action like `write` or `collect` is called on a DataFrame.
- PySpark DataFrames cannot be pickled, which means that [IO Managers](/concepts/io-management/io-managers) like the `fs_io_manager` won't work for them.

In this example, we've defined an Asset Store that knows how to store and retrieve PySpark DataFrames that are produced and consumed by solids.

This example assumes that all the outputs within the pipeline will be PySpark DataFrames and stored in the same way. To learn how to use different IO managers for different outputs within the same pipeline, take a look at the [IO Manager](/concepts/io-management/io-managers) concept page.

This example writes out DataFrames to the local file system, but can be tweaked to write to cloud object stores like S3 by changing to the `write` and `read` invocations.

```python file=../../basic_pyspark/repo.py startafter=start_repo_marker_0 endbefore=end_repo_marker_0
import os

from dagster import IOManager, ModeDefinition, io_manager, pipeline, repository, solid
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


class LocalParquetStore(IOManager):
    def _get_path(self, context):
        return os.path.join(context.run_id, context.step_key, context.name)

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))

    def load_input(self, context):
        spark = SparkSession.builder.getOrCreate()
        return spark.read.parquet(self._get_path(context.upstream_output))


@io_manager
def local_parquet_store(_):
    return LocalParquetStore()


@solid
def make_people(_):
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    spark = SparkSession.builder.getOrCreate()
    return spark.createDataFrame(rows, schema)


@solid
def filter_over_50(_, people):
    return people.filter(people["age"] > 50)


@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": local_parquet_store})])
def my_pipeline():
    filter_over_50(make_people())
```

## Submitting PySpark solids on EMR

<CodeReferenceLink filePath="examples/emr_pyspark" />

This example demonstrates how to have a solid run as a Spark step on an EMR cluster. In it, each
of the three solids will be executed as a separate EMR step on the same EMR cluster.

```python file=../../emr_pyspark/repo.py startafter=start-snippet endbefore=end-snippet
from pathlib import Path

from dagster import (
    ModeDefinition,
    make_python_type_usable_as_dagster_type,
    pipeline,
    repository,
    solid,
)
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
from dagster_pyspark import DataFrame as DagsterPySparkDataFrame
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

# Make pyspark.sql.DataFrame map to dagster_pyspark.DataFrame
make_python_type_usable_as_dagster_type(python_type=DataFrame, dagster_type=DagsterPySparkDataFrame)


@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return context.resources.pyspark.spark_session.createDataFrame(rows, schema)


@solid(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(_, people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)


@solid(required_resource_keys={"pyspark_step_launcher"})
def count_people(_, people: DataFrame) -> int:
    return people.count()


emr_mode = ModeDefinition(
    name="emr",
    resource_defs={
        "pyspark_step_launcher": emr_pyspark_step_launcher.configured(
            {
                "cluster_id": {"env": "EMR_CLUSTER_ID"},
                "local_pipeline_package_path": str(Path(__file__).parent),
                "deploy_local_pipeline_package": True,
                "region_name": "us-west-1",
                "staging_bucket": "my_staging_bucket",
                "wait_for_logs": True,
            }
        ),
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
        "s3": s3_resource,
        "io_manager": s3_pickle_io_manager.configured(
            {"s3_bucket": "my_staging_bucket", "s3_prefix": "simple-pyspark"}
        ),
    },
)

local_mode = ModeDefinition(
    name="local",
    resource_defs={
        "pyspark_step_launcher": no_step_launcher,
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),
    },
)


@pipeline(mode_defs=[emr_mode, local_mode])
def my_pipeline():
    count_people(filter_over_50(make_people()))
```

It accomplishes this by using the `emr_pyspark_step_launcher`, which knows how to launch an EMR step
that runs the contents of a solid. The example defines a mode that links the resource key
"pyspark_step_launcher" to the `emr_pyspark_step_launcher` resource definition, and then requires
that "pyspark_step_launcher" resource key for the solid which it wants to launch remotely.

The EMR PySpark step launcher relies on S3 to shuttle config and events to and from EMR.

More generally, a step launcher is any resource that extends the StepLauncher abstract class,
whose methods can be invoked where a solid would otherwise be executed in-process to instead launch
a remote process with the solid running inside it. To use a step launcher for a particular solid,
set a required resource key for the solid that points to that resource.
